package com.hanxiaozhang.delayoperation.timewheel;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;


/**
 * 功能描述: <br>
 * 〈定时器实现〉
 *
 * 它在TimeWheel的基础上添加了执行到期任务、阻塞等待最近到期任务的功能
 *
 * @Author: hanxinghua
 * @Date: 2024/2/26
 */
@Slf4j
public class SystemTimer implements Timer {


    private final String executorName;

    /**
     * 表示一个槽(桶) bucket 所代表的时间范围
     */
    private final Long tickMs;

    /**
     * 时间轮有多少个槽(桶) bucket
     */
    private final Integer wheelSize;

    /**
     * 时间轮的开始时间
     */
    private final Long startMs;

    /**
     * 执行到期任务的线程池
     */
    private final ExecutorService taskExecutor;

    /**
     * 各个层级的时间轮共用的DelayQueue队列
     * 主要作用是阻塞推进时间轮表针的线程（ExpiredOperationReaper），等待最近到期任务到期
     *
     * Kafka 需要依靠这个队列获取那些已过期的 Bucket，并清除它们
     */
    private final DelayQueue<TimerTaskList> delayQueue;

    /**
     * 各个层级时间轮共用的任务个数计数器
     */
    private final AtomicInteger taskCounter;

    /**
     * 层级时间轮中最底层的时间轮
     */
    private final TimingWheel timingWheel;

    /**
     * TimerTaskEntry重新提交到 执行到期任务的线程池 去 执行
     */
    private final Consumer<TimerTaskEntry> reinsert = timerTaskEntry -> addTimerTaskEntry(timerTaskEntry);

    /**
     * 用来同步时间轮表针currentTime修改的读写锁
     */
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    public SystemTimer(String executorName){
        this(executorName,null,null,null);
    }

    public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
        this.executorName = executorName;
        this.tickMs = tickMs != null ? tickMs : 1L;
        this.wheelSize = wheelSize != null ? wheelSize : 20;
        this.startMs = startMs != null ? startMs : TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        this.taskExecutor = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable, "executor-" + executorName);
            thread.setDaemon(false);
            thread.setUncaughtExceptionHandler((t, e) -> log.error("Uncaught errors in thread '" + t.getName() + "':", e));
            return thread;
        });
        this.delayQueue = new DelayQueue<>();
        this.taskCounter = new AtomicInteger(0);
        this.timingWheel = new TimingWheel(this.tickMs, this.wheelSize, this.startMs, taskCounter, delayQueue);
    }

    /**
     * 添加任务
     * 如果任务已经到期，则将任务提交到taskExecutor中执行
     * 如果任务未到期，则调用TimeWheel.add()方法提交到时间轮中等待到期后执行
     *
     * @param timerTask the task to add
     */
    @Override
    public void add(TimerTask timerTask) {
        readLock.lock();
        try {
            addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.getDelayMs() + TimeUnit.NANOSECONDS.toMillis(System.nanoTime())));
        } finally {
            readLock.unlock();
        }
    }

    /**
     * 时间轮表针的推进，同时对到期的TimerTaskList中的任务进行处理。
     *
     * Tips：使用DelayQueue，一直阻塞到最近到期任务到期的任务，避免了空轮休
     *
     * @param timeoutMs 延迟队列弹出元素的超时时间
     * @return
     */
    @Override
    public Boolean advanceClock(Long timeoutMs) {
        // 阻塞等待
        TimerTaskList bucket = null;
        try {
            bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("InterruptedException occurrs when execute advanceClock method", e);
        }
        // 在阻塞期间，有TimerTaskList到期
        if (bucket != null) {
            writeLock.lock();
            try {
                // 循环处理所有的bucket
                while (bucket != null) {
                    // 推进时间轮表针
                    timingWheel.advanceClock(bucket.getExpiration());
                    // 清空链表中的所有元素，并执行节点中的逻辑
                    bucket.flush(reinsert);
                    // 继续处理下一个，非阻塞方式
                    bucket = delayQueue.poll();
                }
            } finally {
                writeLock.unlock();
            }
            return true;
        } else {
            return false;
        }
    }

    @Override
    public Integer size() {
        return taskCounter.get();
    }

    @Override
    public void shutdown() {
        taskExecutor.shutdown();
    }

    /**
     * 添加TimerTaskEntry
     *
     * @param timerTaskEntry
     */
    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
        // 向时间轮提交添加任务失败，此时任务可能已到期或已取消
        if (!timingWheel.add(timerTaskEntry)) {
            // 如果任务不是已取消状态，即是已到期任务，将到期任务提交到taskExecutor执行
            if (!timerTaskEntry.cancelled()) {
                taskExecutor.submit(timerTaskEntry.getTimerTask());
            }
        }
    }
}
